/*
  Copyright (C) 2009-2018  Brazil
  Copyright (C) 2018-2022  Sutou Kouhei <kou@clear-code.com>

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
*/

#include "grn.h"

#include <stdio.h>
#include <string.h>
#include "grn_ctx_impl.h"

#ifdef WIN32
#  include <ws2tcpip.h>
#else
#  ifdef HAVE_SYS_SOCKET_H
#    include <sys/socket.h>
#  endif /* HAVE_SYS_SOCKET_H */
#  include <netinet/in.h>
#  include <netinet/tcp.h>
#  ifdef HAVE_SIGNAL_H
#    include <signal.h>
#  endif /* HAVE_SIGNAL_H */
#  include <sys/uio.h>
#endif /* WIN32 */

#include "grn_ctx.h"
#include "grn_com.h"

#ifndef PF_INET
#  define PF_INET AF_INET
#endif /* PF_INET */

#ifndef SOL_TCP
#  ifdef IPPROTO_TCP
#    define SOL_TCP IPPROTO_TCP
#  else
#    define SOL_TCP 6
#  endif /* IPPROTO_TCP */
#endif   /* SOL_TCP */

#ifndef USE_MSG_MORE
#  ifdef MSG_MORE
#    undef MSG_MORE
#  endif
#  define MSG_MORE 0
#endif /* USE_MSG_MORE */

#ifndef USE_MSG_NOSIGNAL
#  ifdef MSG_NOSIGNAL
#    undef MSG_NOSIGNAL
#  endif
#  define MSG_NOSIGNAL 0
#endif /* USE_MSG_NOSIGNAL */
/******* grn_com_queue ********/

grn_rc
grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e)
{
  CRITICAL_SECTION_ENTER(q->cs);
  e->next = NULL;
  *q->tail = e;
  q->tail = &e->next;
  q->size++;
  CRITICAL_SECTION_LEAVE(q->cs);
  return GRN_SUCCESS;
}

grn_com_queue_entry *
grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q)
{
  grn_com_queue_entry *e = NULL;

  CRITICAL_SECTION_ENTER(q->cs);
  if (q->next) {
    e = q->next;
    if (!(q->next = e->next)) {
      q->tail = &q->next;
    }
    q->size--;
  }
  CRITICAL_SECTION_LEAVE(q->cs);

  return e;
}

uint64_t
grn_com_queue_size(grn_ctx *ctx, grn_com_queue *q)
{
  return q->size;
}

static grn_com_queue *grn_job_queue_current = NULL;

void
grn_job_queue_current_set(grn_ctx *ctx, grn_com_queue *queue)
{
  grn_job_queue_current = queue;
}

grn_com_queue *
grn_job_queue_current_get(grn_ctx *ctx)
{
  return grn_job_queue_current;
}

/******* grn_msg ********/

grn_obj *
grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old)
{
  grn_msg *msg = NULL;
  if (old && (msg = (grn_msg *)grn_com_queue_deque(ctx, old))) {
    if (msg->ctx != ctx) {
      ERR(GRN_INVALID_ARGUMENT, "ctx unmatch");
      return NULL;
    }
    GRN_BULK_REWIND(&msg->qe.obj);
  } else if ((msg = GRN_CALLOC(sizeof(grn_msg)))) {
    GRN_OBJ_INIT(&msg->qe.obj, GRN_MSG, 0, GRN_DB_TEXT);
    msg->qe.obj.header.impl_flags |= GRN_OBJ_ALLOCATED;
    msg->ctx = ctx;
  }
  msg->qe.next = NULL;
  msg->u.peer = com;
  msg->old = old;
  memset(&msg->header, 0, sizeof(grn_com_header));
  return (grn_obj *)msg;
}

grn_obj *
grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old)
{
  grn_msg *req = (grn_msg *)query, *msg = NULL;
  if (req && (msg = (grn_msg *)grn_msg_open(ctx, req->u.peer, old))) {
    msg->edge_id = req->edge_id;
    msg->header.proto = req->header.proto == GRN_COM_PROTO_MBREQ
                          ? GRN_COM_PROTO_MBRES
                          : req->header.proto;
  }
  return (grn_obj *)msg;
}

grn_rc
grn_msg_close(grn_ctx *ctx, grn_obj *obj)
{
  grn_msg *msg = (grn_msg *)obj;
  if (ctx == msg->ctx) {
    return grn_obj_close(ctx, obj);
  }
  return grn_com_queue_enque(ctx, msg->old, (grn_com_queue_entry *)msg);
}

grn_rc
grn_msg_set_property(grn_ctx *ctx,
                     grn_obj *obj,
                     uint16_t status,
                     uint16_t key_size,
                     uint8_t extra_size)
{
  grn_com_header *header = &((grn_msg *)obj)->header;
  header->status = htons(status);
  header->keylen = htons(key_size);
  header->level = extra_size;
  return GRN_SUCCESS;
}

grn_rc
grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags)
{
  grn_rc rc;
  grn_msg *m = (grn_msg *)msg;
  grn_com *peer = m->u.peer;
  grn_com_header *header = &m->header;
  if (GRN_COM_QUEUE_EMPTYP(&peer->new_)) {
    switch (header->proto) {
    case GRN_COM_PROTO_HTTP:
      {
        ssize_t ret;
        ret =
          send(peer->fd, GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), MSG_NOSIGNAL);
        if (ret == -1) {
          SOERR("send");
        }
        if (ctx->rc != GRN_OPERATION_WOULD_BLOCK) {
          grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
          return ctx->rc;
        }
      }
      break;
    case GRN_COM_PROTO_GQTP:
      {
        if (flags & GRN_CTX_MORE) {
          flags |= GRN_CTX_QUIET;
        }
        if (ctx->stat == GRN_CTX_QUIT) {
          flags |= GRN_CTX_QUIT;
        }
        header->qtype = (uint8_t)ctx->impl->output.type;
        header->keylen = 0;
        header->level = 0;
        header->flags = (uint8_t)flags;
        header->status = htons((uint16_t)ctx->rc);
        header->opaque = 0;
        header->cas = 0;
        // todo : MSG_DONTWAIT
        rc = grn_com_send(ctx,
                          peer,
                          header,
                          GRN_BULK_HEAD(msg),
                          (uint32_t)GRN_BULK_VSIZE(msg),
                          0);
        if (rc != GRN_OPERATION_WOULD_BLOCK) {
          grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
          return rc;
        }
      }
      break;
    case GRN_COM_PROTO_MBREQ:
      return GRN_FUNCTION_NOT_IMPLEMENTED;
    case GRN_COM_PROTO_MBRES:
      rc = grn_com_send(ctx,
                        peer,
                        header,
                        GRN_BULK_HEAD(msg),
                        (uint32_t)GRN_BULK_VSIZE(msg),
                        (flags & GRN_CTX_MORE) ? MSG_MORE : 0);
      if (rc != GRN_OPERATION_WOULD_BLOCK) {
        grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
        return rc;
      }
      break;
    default:
      return GRN_INVALID_ARGUMENT;
    }
  }
  CRITICAL_SECTION_ENTER(peer->ev->critical_section);
  rc = grn_com_queue_enque(ctx, &peer->new_, (grn_com_queue_entry *)msg);
  COND_SIGNAL(peer->ev->cond);
  CRITICAL_SECTION_LEAVE(peer->ev->critical_section);
  return rc;
}

/******* grn_com ********/

grn_rc
grn_com_init(void)
{
#ifdef WIN32
  WSADATA wd;
  if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) {
    grn_ctx *ctx = &grn_gctx;
    SOERR("WSAStartup");
  }
#else /* WIN32 */
#  ifndef USE_MSG_NOSIGNAL
  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
    grn_ctx *ctx = &grn_gctx;
    SERR("signal");
  }
#  endif /* USE_MSG_NOSIGNAL */
#endif   /* WIN32 */
  return grn_gctx.rc;
}

void
grn_com_fin(void)
{
#ifdef WIN32
  WSACleanup();
#endif /* WIN32 */
}

grn_rc
grn_com_event_init(grn_ctx *ctx,
                   grn_com_event *ev,
                   int max_nevents,
                   int data_size)
{
  ev->max_nevents = max_nevents;
  if ((ev->hash = grn_hash_create(ctx,
                                  NULL,
                                  sizeof(grn_sock),
                                  (unsigned int)data_size,
                                  0))) {
    CRITICAL_SECTION_INIT(ev->critical_section);
    COND_INIT(ev->cond);
    GRN_COM_QUEUE_INIT(&ev->recv_old);
    ev->msg_handler = NULL;
    memset(&(ev->curr_edge_id), 0, sizeof(grn_com_addr));
    ev->acceptor = NULL;
    ev->listen_backlog = GRN_COM_EVENT_LISTEN_BACKLOG_DEFAULT;
    ev->opaque = NULL;
#ifndef USE_SELECT
#  ifdef USE_EPOLL
    if ((ev->events =
           GRN_MALLOC(sizeof(struct epoll_event) * (size_t)max_nevents))) {
      if ((ev->epfd = epoll_create(max_nevents)) != -1) {
        goto exit;
      } else {
        SERR("epoll_create");
      }
      GRN_FREE(ev->events);
    }
#  else /* USE_EPOLL */
#    ifdef USE_KQUEUE
    if ((ev->events = GRN_MALLOC(sizeof(struct kevent) * max_nevents))) {
      if ((ev->kqfd = kqueue()) != -1) {
        goto exit;
      } else {
        SERR("kqueue");
      }
      GRN_FREE(ev->events);
    }
#    else  /* USE_KQUEUE */
    if ((ev->events = GRN_MALLOC(sizeof(struct pollfd) * max_nevents))) {
      goto exit;
    }
#    endif /* USE_KQUEUE*/
#  endif   /* USE_EPOLL */
    grn_hash_close(ctx, ev->hash);
    ev->hash = NULL;
    ev->events = NULL;
#else  /* USE_SELECT */
    goto exit;
#endif /* USE_SELECT */
  }
exit:
  return ctx->rc;
}

grn_rc
grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev)
{
  grn_obj *msg;
  while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &ev->recv_old))) {
    grn_msg_close(ctx, msg);
  }
  if (ev->hash) {
    grn_hash_close(ctx, ev->hash);
  }
#ifndef USE_SELECT
  if (ev->events) {
    GRN_FREE(ev->events);
  }
#  ifdef USE_EPOLL
  grn_close(ev->epfd);
#  endif /* USE_EPOLL */
#  ifdef USE_KQUEUE
  grn_close(ev->kqfd);
#  endif /* USE_KQUEUE*/
#endif   /* USE_SELECT */
  CRITICAL_SECTION_FIN(ev->critical_section);
  return GRN_SUCCESS;
}

void
grn_com_event_set_listen_backlog(grn_ctx *ctx, grn_com_event *ev, int backlog)
{
  ev->listen_backlog = backlog;
}

int
grn_com_event_get_listen_backlog(grn_ctx *ctx, grn_com_event *ev)
{
  return ev->listen_backlog;
}

grn_rc
grn_com_event_add(
  grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
{
  grn_com *c;
  /* todo : expand events */
  if (!ev || *ev->hash->n_entries == (uint32_t)(ev->max_nevents)) {
    if (ev) {
      GRN_LOG(ctx, GRN_LOG_ERROR, "too many connections (%d)", ev->max_nevents);
    }
    return GRN_INVALID_ARGUMENT;
  }
#ifdef USE_EPOLL
  {
    struct epoll_event e;
    memset(&e, 0, sizeof(struct epoll_event));
    e.data.fd = (fd);
    e.events = (uint32_t)events;
    if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) {
      SERR("epoll_ctl");
      return ctx->rc;
    }
  }
#endif /* USE_EPOLL*/
#ifdef USE_KQUEUE
  {
    struct kevent e;
    /* todo: udata should have fd */
    EV_SET(&e, (fd), events, EV_ADD, 0, 0, NULL);
    if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
      SERR("kevent");
      return ctx->rc;
    }
  }
#endif /* USE_KQUEUE */
  {
    if (grn_hash_add(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c, NULL)) {
      c->ev = ev;
      c->fd = fd;
      c->events = events;
      if (com) {
        *com = c;
      }
    }
  }
  return ctx->rc;
}

grn_rc
grn_com_event_mod(
  grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
{
  grn_com *c;
  if (!ev) {
    return GRN_INVALID_ARGUMENT;
  }
  if (grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c)) {
    if (c->fd != fd) {
      GRN_LOG(ctx,
              GRN_LOG_ERROR,
              "grn_com_event_mod fd unmatch "
              "%" GRN_FMT_SOCKET " != %" GRN_FMT_SOCKET,
              c->fd,
              fd);
      return GRN_OBJECT_CORRUPT;
    }
    if (com) {
      *com = c;
    }
    if (c->events != events) {
#ifdef USE_EPOLL
      struct epoll_event e;
      memset(&e, 0, sizeof(struct epoll_event));
      e.data.fd = (fd);
      e.events = (uint32_t)events;
      if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) {
        SERR("epoll_ctl");
        return ctx->rc;
      }
#endif /* USE_EPOLL*/
#ifdef USE_KQUEUE
      // experimental
      struct kevent e[2];
      EV_SET(&e[0],
             (fd),
             GRN_COM_POLLIN | GRN_COM_POLLOUT,
             EV_DELETE,
             0,
             0,
             NULL);
      EV_SET(&e[1], (fd), events, EV_ADD, 0, 0, NULL);
      if (kevent(ev->kqfd, e, 2, NULL, 0, NULL) == -1) {
        SERR("kevent");
        return ctx->rc;
      }
#endif /* USE_KQUEUE */
      c->events = events;
    }
    return GRN_SUCCESS;
  }
  return GRN_INVALID_ARGUMENT;
}

grn_rc
grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd)
{
  if (!ev) {
    return GRN_INVALID_ARGUMENT;
  }
  {
    grn_com *c;
    grn_id id = grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c);
    if (id) {
#ifdef USE_EPOLL
      if (!c->closed) {
        struct epoll_event e;
        memset(&e, 0, sizeof(struct epoll_event));
        e.data.fd = fd;
        e.events = (uint32_t)(c->events);
        if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) {
          SERR("epoll_ctl");
          return ctx->rc;
        }
      }
#endif /* USE_EPOLL*/
#ifdef USE_KQUEUE
      struct kevent e;
      EV_SET(&e, (fd), c->events, EV_DELETE, 0, 0, NULL);
      if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
        SERR("kevent");
        return ctx->rc;
      }
#endif /* USE_KQUEUE */
      return grn_hash_delete_by_id(ctx, ev->hash, id, NULL);
    } else {
      GRN_LOG(ctx,
              GRN_LOG_ERROR,
              "%04x| fd(%" GRN_FMT_SOCKET ") not found in ev(%p)",
              grn_getpid(),
              fd,
              ev);
      return GRN_INVALID_ARGUMENT;
    }
  }
}

grn_rc
grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev)
{
  grn_com *com = ev->acceptor;

  if (com->accepting) {
    return ctx->rc;
  }

  GRN_API_ENTER;
  if (!grn_com_event_mod(ctx, ev, com->fd, GRN_COM_POLLIN, NULL)) {
    if (listen(com->fd, ev->listen_backlog) == 0) {
      com->accepting = true;
    } else {
      SOERR("listen - start accept");
    }
  }
  GRN_API_RETURN(ctx->rc);
}

grn_rc
grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev)
{
  grn_com *com = ev->acceptor;

  if (!com->accepting) {
    return ctx->rc;
  }

  GRN_API_ENTER;
  if (!grn_com_event_mod(ctx, ev, com->fd, 0, NULL)) {
    if (listen(com->fd, 0) == 0) {
      com->accepting = false;
    } else {
      SOERR("listen - disable accept");
    }
  }
  GRN_API_RETURN(ctx->rc);
}

static void
grn_com_receiver(grn_ctx *ctx, grn_com *com)
{
  grn_com_event *ev = com->ev;
  ERRCLR(ctx);
  if (ev->acceptor == com) {
    grn_com *ncs;
    grn_sock fd = accept(com->fd, NULL, NULL);
    if (fd == -1) {
      if (errno == EMFILE) {
        grn_com_event_stop_accept(ctx, ev);
      } else {
        SOERR("accept");
      }
      return;
    }
    if (grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, (grn_com **)&ncs)) {
      grn_sock_close(fd);
      return;
    }
    ncs->has_sid = 0;
    ncs->closed = 0;
    ncs->opaque = NULL;
    GRN_COM_QUEUE_INIT(&ncs->new_);
    // GRN_LOG(ctx, GRN_LOG_NOTICE, "accepted (%d)", fd);
    return;
  } else {
    grn_msg *msg = (grn_msg *)grn_msg_open(ctx, com, &ev->recv_old);
    grn_com_recv(ctx, msg->u.peer, &msg->header, (grn_obj *)msg);
    if (msg->u.peer /* is_edge_request(msg)*/) {
      grn_memcpy(&msg->edge_id, &ev->curr_edge_id, sizeof(grn_com_addr));
      if (!com->has_sid) {
        com->has_sid = 1;
        com->sid = ev->curr_edge_id.sid++;
      }
      msg->edge_id.sid = com->sid;
    }
    msg->acceptor = ev->acceptor;
    ev->msg_handler(ctx, (grn_obj *)msg);
  }
}

grn_rc
grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout)
{
  int nevents;
  grn_com *com;
#ifdef USE_SELECT
  uint32_t dummy;
  grn_sock *pfd;
  int nfds = 0;
  fd_set rfds;
  fd_set wfds;
  struct timeval tv;
  if (timeout >= 0) {
    tv.tv_sec = timeout / 1000;
    tv.tv_usec = (timeout % 1000) * 1000;
  }
  FD_ZERO(&rfds);
  FD_ZERO(&wfds);
  ctx->errlvl = GRN_OK;
  ctx->rc = GRN_SUCCESS;
  {
    grn_hash_cursor *cursor;
    cursor = grn_hash_cursor_open(ctx, ev->hash, NULL, 0, NULL, 0, 0, -1, 0);
    if (cursor) {
      grn_id id;
      while ((id = grn_hash_cursor_next(ctx, cursor))) {
        grn_hash_cursor_get_key_value(ctx,
                                      cursor,
                                      (void **)(&pfd),
                                      &dummy,
                                      (void **)(&com));
        if (com->events & GRN_COM_POLLIN) {
          FD_SET(*pfd, &rfds);
        }
        if (com->events & GRN_COM_POLLOUT) {
          FD_SET(*pfd, &wfds);
        }
#  ifndef WIN32
        if (*pfd > nfds) {
          nfds = *pfd;
        }
#  endif /* WIN32 */
      }
      grn_hash_cursor_close(ctx, cursor);
    }
  }
  nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL);
  if (nevents < 0) {
    SOERR("select");
    if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
      ERRCLR(ctx);
    }
    return ctx->rc;
  }
  if (timeout < 0 && !nevents) {
    GRN_LOG(ctx, GRN_LOG_NOTICE, "select returns 0 events");
  }
  GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
    if (FD_ISSET(*pfd, &rfds)) {
      grn_com_receiver(ctx, com);
    }
  });
#else /* USE_SELECT */
#  ifdef USE_EPOLL
  struct epoll_event *ep;
  ctx->errlvl = GRN_OK;
  ctx->rc = GRN_SUCCESS;
  nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout);
  if (nevents < 0) {
    SERR("epoll_wait");
  }
#  else /* USE_EPOLL */
#    ifdef USE_KQUEUE
  struct kevent *ep;
  struct timespec tv;
  if (timeout >= 0) {
    tv.tv_sec = timeout / 1000;
    tv.tv_nsec = (timeout % 1000) * 1000;
  }
  nevents = kevent(ev->kqfd, NULL, 0, ev->events, ev->max_nevents, &tv);
  if (nevents < 0) {
    SERR("kevent");
  }
#    else  /* USE_KQUEUE */
  uint32_t dummy;
  int nfd = 0, *pfd;
  struct pollfd *ep = ev->events;
  ctx->errlvl = GRN_OK;
  ctx->rc = GRN_SUCCESS;
  GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
    ep->fd = *pfd;
    //    ep->events =(short) com->events;
    ep->events = POLLIN;
    ep->revents = 0;
    ep++;
    nfd++;
  });
  nevents = poll(ev->events, nfd, timeout);
  if (nevents < 0) {
    SERR("poll");
  }
#    endif /* USE_KQUEUE */
#  endif   /* USE_EPOLL */
  if (ctx->rc != GRN_SUCCESS) {
    if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
      ERRCLR(ctx);
    }
    return ctx->rc;
  }
  if (timeout < 0 && !nevents) {
    GRN_LOG(ctx, GRN_LOG_NOTICE, "poll returns 0 events");
  }
  for (ep = ev->events; nevents; ep++) {
    int efd;
#  ifdef USE_EPOLL
    efd = ep->data.fd;
    nevents--;
    // todo : com = ep->data.ptr;
    if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
      struct epoll_event e;
      GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
      memset(&e, 0, sizeof(struct epoll_event));
      e.data.fd = efd;
      e.events = ep->events;
      if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) {
        SERR("epoll_ctl");
      }
      if (grn_sock_close(efd) == -1) {
        SOERR("close");
      }
      continue;
    }
    if (ep->events & GRN_COM_POLLIN) {
      grn_com_receiver(ctx, com);
    }
#  else /* USE_EPOLL */
#    ifdef USE_KQUEUE
    efd = ep->ident;
    nevents--;
    // todo : com = ep->udata;
    if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
      struct kevent e;
      GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->set", efd);
      EV_SET(&e, efd, ep->filter, EV_DELETE, 0, 0, NULL);
      if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
        SERR("kevent");
      }
      if (grn_sock_close(efd) == -1) {
        SOERR("close");
      }
      continue;
    }
    if (ep->filter == GRN_COM_POLLIN) {
      grn_com_receiver(ctx, com);
    }
#    else
    efd = ep->fd;
    if (!(ep->events & ep->revents)) {
      continue;
    }
    nevents--;
    if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
      GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
      if (grn_sock_close(efd) == -1) {
        SOERR("close");
      }
      continue;
    }
    if (ep->revents & GRN_COM_POLLIN) {
      grn_com_receiver(ctx, com);
    }
#    endif /* USE_KQUEUE */
#  endif   /* USE_EPOLL */
  }
#endif     /* USE_SELECT */
  /* todo :
  while (!(msg = (grn_com_msg *)grn_com_queue_deque(&recv_old))) {
    grn_msg_close(ctx, msg);
  }
  */
  return GRN_SUCCESS;
}

grn_rc
grn_com_send_http(
  grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags)
{
  ssize_t ret;
  grn_obj buf;
  GRN_TEXT_INIT(&buf, 0);
  GRN_TEXT_PUTS(ctx, &buf, "GET ");
  grn_bulk_write(ctx, &buf, path, path_len);
  GRN_TEXT_PUTS(ctx, &buf, " HTTP/1.0\r\n\r\n");
  // todo : refine
  if ((ret = send(cs->fd,
                  GRN_BULK_HEAD(&buf),
                  GRN_BULK_VSIZE(&buf),
                  MSG_NOSIGNAL | flags)) == -1) {
    SOERR("send");
  }
  if ((size_t)ret != GRN_BULK_VSIZE(&buf)) {
    GRN_LOG(ctx,
            GRN_LOG_NOTICE,
            "send %d != %d",
            (int)ret,
            (int)GRN_BULK_VSIZE(&buf));
  }
  grn_obj_close(ctx, &buf);
  return ctx->rc;
}

grn_rc
grn_com_send(grn_ctx *ctx,
             grn_com *cs,
             grn_com_header *header,
             const char *body,
             uint32_t size,
             int flags)
{
  grn_rc rc = GRN_SUCCESS;
  size_t whole_size = sizeof(grn_com_header) + size;
  ssize_t ret;
  header->size = htonl(size);
  GRN_LOG(ctx,
          GRN_LOG_INFO,
          "send (%d,%x,%d,%02x,%02x,%04x)",
          size,
          header->flags,
          header->proto,
          header->qtype,
          header->level,
          header->status);

  if (size) {
#ifdef WIN32
    WSABUF wsabufs[2];
    DWORD n_sent;
    wsabufs[0].buf = (char *)header;
    wsabufs[0].len = sizeof(grn_com_header);
    wsabufs[1].buf = (char *)body;
    wsabufs[1].len = size;
    if (WSASend(cs->fd, wsabufs, 2, &n_sent, 0, NULL, NULL) == SOCKET_ERROR) {
      SOERR("WSASend");
    }
    ret = n_sent;
#else  /* WIN32 */
    struct iovec msg_iov[2];
    struct msghdr msg;
    memset(&msg, 0, sizeof(struct msghdr));
    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = msg_iov;
    msg.msg_iovlen = 2;
    msg_iov[0].iov_base = header;
    msg_iov[0].iov_len = sizeof(grn_com_header);
    msg_iov[1].iov_base = (char *)body;
    msg_iov[1].iov_len = size;
    if ((ret = sendmsg(cs->fd, &msg, MSG_NOSIGNAL | flags)) == -1) {
      SOERR("sendmsg");
      rc = ctx->rc;
    }
#endif /* WIN32 */
  } else {
    if ((ret = send(cs->fd,
                    (const void *)header,
                    whole_size,
                    MSG_NOSIGNAL | flags)) == -1) {
      SOERR("send");
      rc = ctx->rc;
    }
  }
  if ((size_t)ret != whole_size) {
    GRN_LOG(ctx,
            GRN_LOG_ERROR,
            "sendmsg(%" GRN_FMT_SOCKET "): %" GRN_FMT_LLD " < %" GRN_FMT_LLU,
            cs->fd,
            (long long int)ret,
            (unsigned long long int)whole_size);
    rc = ctx->rc;
  }
  GRN_QUERY_LOG(ctx,
                GRN_QUERY_LOG_SIZE,
                ":",
                "send(%" GRN_FMT_SIZE "): %" GRN_FMT_SIZE "/%" GRN_FMT_SIZE,
                whole_size,
                ret,
                whole_size);
  return rc;
}

#define RETRY_MAX 10

static const char *
scan_delimiter(const char *p, const char *e)
{
  while (p + 4 <= e) {
    if (p[3] == '\n') {
      if (p[2] == '\r') {
        if (p[1] == '\n') {
          if (p[0] == '\r') {
            return p + 4;
          } else {
            p += 2;
          }
        } else {
          p += 2;
        }
      } else {
        p += 4;
      }
    } else {
      p += p[3] == '\r' ? 1 : 4;
    }
  }
  return NULL;
}

#define BUFSIZE 4096

static grn_rc
grn_com_recv_text(
  grn_ctx *ctx, grn_com *com, grn_com_header *header, grn_obj *buf, ssize_t ret)
{
  const char *p;
  int retry = 0;
  grn_bulk_write(ctx, buf, (char *)header, (size_t)ret);
  if ((p = scan_delimiter(GRN_BULK_HEAD(buf), GRN_BULK_CURR(buf)))) {
    header->qtype = (uint8_t)(*GRN_BULK_HEAD(buf));
    header->proto = GRN_COM_PROTO_HTTP;
    header->size = (uint32_t)GRN_BULK_VSIZE(buf);
    goto exit;
  }
  for (;;) {
    if (grn_bulk_reserve(ctx, buf, BUFSIZE)) {
      return ctx->rc;
    }
    if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
      SOERR("recv text");
      if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
          ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
        ERRCLR(ctx);
        continue;
      }
      goto exit;
    }
    if (ret) {
      off_t o = (off_t)GRN_BULK_VSIZE(buf);
      p = GRN_BULK_CURR(buf);
      GRN_BULK_INCR_LEN(buf, ret);
      if (scan_delimiter(p - (o > 3 ? 3 : o), p + ret)) {
        break;
      }
    } else {
      if (++retry > RETRY_MAX) {
        // ERR(GRN_RETRY_MAX, "retry max in recv text");
        goto exit;
      }
    }
  }
  header->qtype = (uint8_t)*GRN_BULK_HEAD(buf);
  header->proto = GRN_COM_PROTO_HTTP;
  header->size = (uint32_t)GRN_BULK_VSIZE(buf);
exit:
  if (header->qtype == 'H') {
    // todo : refine
    /*
    GRN_BULK_REWIND(buf);
    grn_bulk_reserve(ctx, buf, BUFSIZE);
    if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
      SOERR("recv text body");
    } else {
      GRN_BULK_CURR(buf) += ret;
    }
    */
  }
  return ctx->rc;
}

grn_rc
grn_com_recv(grn_ctx *ctx, grn_com *com, grn_com_header *header, grn_obj *buf)
{
  ssize_t ret;
  int retry = 0;
  byte *p = (byte *)header;
  size_t rest = sizeof(grn_com_header);
  do {
    if ((ret = recv(com->fd, p, rest, 0)) < 0) {
      SOERR("recv size");
      GRN_LOG(ctx, GRN_LOG_ERROR, "recv error (%" GRN_FMT_SOCKET ")", com->fd);
      if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
          ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
        ERRCLR(ctx);
        continue;
      }
      goto exit;
    }
    if (ret) {
      if (header->proto < 0x80) {
        return grn_com_recv_text(ctx, com, header, buf, ret);
      }
      rest -= (size_t)ret;
      p += ret;
    } else {
      if (++retry > RETRY_MAX) {
        // ERR(GRN_RETRY_MAX, "retry max in recv header (%d)", com->fd);
        goto exit;
      }
    }
  } while (rest);
  GRN_LOG(ctx,
          GRN_LOG_INFO,
          "recv (%u,%x,%d,%02x,%02x,%04x)",
          (uint32_t)ntohl(header->size),
          header->flags,
          header->proto,
          header->qtype,
          header->level,
          header->status);
  {
    uint8_t proto = header->proto;
    size_t value_size = ntohl(header->size);
    GRN_BULK_REWIND(buf);
    switch (proto) {
    case GRN_COM_PROTO_GQTP:
    case GRN_COM_PROTO_MBREQ:
      if (GRN_BULK_WSIZE(buf) < value_size) {
        if (grn_bulk_resize(ctx, buf, value_size)) {
          goto exit;
        }
      }
      retry = 0;
      for (rest = value_size; rest;) {
        if ((ret = recv(com->fd, GRN_BULK_CURR(buf), rest, MSG_WAITALL)) < 0) {
          SOERR("recv body");
          if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
              ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
            ERRCLR(ctx);
            continue;
          }
          goto exit;
        }
        if (ret) {
          rest -= (size_t)ret;
          GRN_BULK_INCR_LEN(buf, ret);
        } else {
          if (++retry > RETRY_MAX) {
            // ERR(GRN_RETRY_MAX, "retry max in recv body");
            goto exit;
          }
        }
      }
      break;
    default:
      GRN_LOG(ctx, GRN_LOG_ERROR, "illegal header: %d", proto);
      ctx->rc = GRN_INVALID_FORMAT;
      goto exit;
    }
  }
exit:
  return ctx->rc;
}

grn_com *
grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port)
{
  grn_sock fd = -1;
  grn_com *cs = NULL;

  struct addrinfo hints, *addrinfo_list, *addrinfo_ptr;
  char port_string[16];
  int getaddrinfo_result;

  memset(&hints, 0, sizeof(hints));
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
#ifdef AI_NUMERICSERV
  hints.ai_flags = AI_NUMERICSERV;
#endif
  grn_snprintf(port_string,
               sizeof(port_string),
               sizeof(port_string),
               "%d",
               port);

  getaddrinfo_result = getaddrinfo(dest, port_string, &hints, &addrinfo_list);
  if (getaddrinfo_result != 0) {
    switch (getaddrinfo_result) {
#ifdef EAI_MEMORY
    case EAI_MEMORY:
      ERR(GRN_NO_MEMORY_AVAILABLE,
          "getaddrinfo: <%s:%s>: %s",
          dest,
          port_string,
          gai_strerror(getaddrinfo_result));
      break;
#endif
#ifdef EAI_SYSTEM
    case EAI_SYSTEM:
      SOERR("getaddrinfo");
      break;
#endif
    default:
      ERR(GRN_INVALID_ARGUMENT,
          "getaddrinfo: <%s:%s>: %s",
          dest,
          port_string,
          gai_strerror(getaddrinfo_result));
      break;
    }
    return NULL;
  }

  for (addrinfo_ptr = addrinfo_list; addrinfo_ptr;
       addrinfo_ptr = addrinfo_ptr->ai_next) {
    fd = socket(addrinfo_ptr->ai_family,
                addrinfo_ptr->ai_socktype,
                addrinfo_ptr->ai_protocol);
    if (fd == GRN_INVALID_SOCKET) {
      SOERR("socket");
      continue;
    }
#ifdef TCP_NODELAY
    {
      static const int value = 1;
      if (setsockopt(fd, 6, TCP_NODELAY, (const char *)&value, sizeof(value)) !=
          0) {
        SOERR("setsockopt");
        grn_sock_close(fd);
        continue;
      }
    }
#endif
    if (connect(fd, addrinfo_ptr->ai_addr, addrinfo_ptr->ai_addrlen) != 0) {
      SOERR("connect");
      grn_sock_close(fd);
      continue;
    }

    break;
  }

  freeaddrinfo(addrinfo_list);

  if (!addrinfo_ptr) {
    return NULL;
  }
  ctx->errlvl = GRN_OK;
  ctx->rc = GRN_SUCCESS;

  if (ev) {
    grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, &cs);
  } else {
    cs = GRN_CALLOC(sizeof(grn_com));
    if (cs) {
      cs->fd = fd;
    }
  }
  if (!cs) {
    grn_sock_close(fd);
  }
  return cs;
}

void
grn_com_close_(grn_ctx *ctx, grn_com *com)
{
  grn_sock fd = com->fd;
  if (shutdown(fd, SHUT_RDWR) == -1) { /* SOERR("shutdown"); */
  }
  if (grn_sock_close(fd) == -1) {
    SOERR("close");
  } else {
    com->closed = 1;
  }
}

grn_rc
grn_com_close(grn_ctx *ctx, grn_com *com)
{
  grn_sock fd = com->fd;
  grn_com_event *ev = com->ev;
  if (ev) {
    grn_com *acceptor = ev->acceptor;
    grn_com_event_del(ctx, ev, fd);
    if (acceptor) {
      grn_com_event_start_accept(ctx, ev);
    }
  }
  if (!com->closed) {
    grn_com_close_(ctx, com);
  }
  if (!ev) {
    GRN_FREE(com);
  }
  return GRN_SUCCESS;
}

grn_rc
grn_com_sopen(grn_ctx *ctx,
              grn_com_event *ev,
              const char *bind_address,
              uint16_t port,
              grn_msg_handler *func,
              struct hostent *he)
{
  grn_sock lfd = -1;
  grn_com *cs = NULL;
  int getaddrinfo_result;
  struct addrinfo *bind_address_info = NULL;
  struct addrinfo hints;
  char port_string[6]; /* ceil(log10(65535)) + 1 ('\0')*/

  GRN_API_ENTER;
  if (!bind_address) {
    bind_address = "0.0.0.0";
  }
  grn_snprintf(port_string,
               sizeof(port_string),
               sizeof(port_string),
               "%u",
               port);
  memset(&hints, 0, sizeof(struct addrinfo));
  hints.ai_family = PF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
#ifdef AI_NUMERICSERV
  hints.ai_flags = AI_NUMERICSERV;
#endif
  getaddrinfo_result =
    getaddrinfo(bind_address, port_string, &hints, &bind_address_info);
  if (getaddrinfo_result != 0) {
    switch (getaddrinfo_result) {
#ifdef EAI_MEMORY
    case EAI_MEMORY:
      ERR(GRN_NO_MEMORY_AVAILABLE,
          "getaddrinfo: <%s:%s>: %s",
          bind_address,
          port_string,
          gai_strerror(getaddrinfo_result));
      break;
#endif
#ifdef EAI_SYSTEM
    case EAI_SYSTEM:
      SOERR("getaddrinfo");
      break;
#endif
    default:
      ERR(GRN_INVALID_ARGUMENT,
          "getaddrinfo: <%s:%s>: %s",
          bind_address,
          port_string,
          gai_strerror(getaddrinfo_result));
      break;
    }
    goto exit;
  }
  if ((lfd = socket(bind_address_info->ai_family, SOCK_STREAM, 0)) == -1) {
    SOERR("socket");
    goto exit;
  }
  grn_memcpy(&ev->curr_edge_id.addr, he->h_addr, (size_t)(he->h_length));
  ev->curr_edge_id.port = htons(port);
  ev->curr_edge_id.sid = 0;
  {
    int v = 1;
#ifdef TCP_NODELAY
    if (setsockopt(lfd, SOL_TCP, TCP_NODELAY, (void *)&v, sizeof(int)) == -1) {
      SOERR("setsockopt");
      goto exit;
    }
#endif
    if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (void *)&v, sizeof(int)) ==
        -1) {
      SOERR("setsockopt");
      goto exit;
    }
  }
  if (bind(lfd, bind_address_info->ai_addr, bind_address_info->ai_addrlen) <
      0) {
    SOERR("bind");
    goto exit;
  }
  if (listen(lfd, ev->listen_backlog) < 0) {
    SOERR("listen");
    goto exit;
  }
  if (ev) {
    if (grn_com_event_add(ctx, ev, lfd, GRN_COM_POLLIN, &cs)) {
      goto exit;
    }
    ev->acceptor = cs;
    ev->msg_handler = func;
    cs->has_sid = 0;
    cs->closed = 0;
    cs->opaque = NULL;
    GRN_COM_QUEUE_INIT(&cs->new_);
  } else {
    if (!(cs = GRN_CALLOC(sizeof(grn_com)))) {
      goto exit;
    }
    cs->fd = lfd;
  }
  cs->accepting = true;
exit:
  if (!cs && lfd != 1) {
    grn_sock_close(lfd);
  }
  if (bind_address_info) {
    freeaddrinfo(bind_address_info);
  }
  GRN_API_RETURN(ctx->rc);
}

grn_hash *grn_edges = NULL;
void (*grn_dispatcher)(grn_ctx *ctx, grn_edge *edge);

void
grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge))
{
  grn_edges =
    grn_hash_create(ctx, NULL, sizeof(grn_com_addr), sizeof(grn_edge), 0);
  grn_dispatcher = dispatcher;
}

void
grn_edges_fin(grn_ctx *ctx)
{
  grn_hash_close(ctx, grn_edges);
}

grn_edge *
grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added)
{
  if (grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) {
    return NULL;
  } else {
    grn_edge *edge;
    grn_id id = grn_hash_add(ctx,
                             grn_edges,
                             addr,
                             sizeof(grn_com_addr),
                             (void **)&edge,
                             added);
    grn_io_unlock(ctx, grn_edges->io);
    if (id) {
      edge->id = id;
    }
    return edge;
  }
}

void
grn_edges_delete(grn_ctx *ctx, grn_edge *edge)
{
  if (!grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) {
    grn_hash_delete_by_id(ctx, grn_edges, edge->id, NULL);
    grn_io_unlock(ctx, grn_edges->io);
  }
}

grn_edge *
grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr)
{
  int added;
  grn_edge *edge = grn_edges_add(ctx, addr, &added);
  if (added) {
    grn_ctx_init(&edge->ctx, 0);
    GRN_COM_QUEUE_INIT(&edge->recv_new);
    GRN_COM_QUEUE_INIT(&edge->send_old);
    edge->com = NULL;
    edge->stat = 0 /*EDGE_IDLE*/;
    edge->flags = GRN_EDGE_COMMUNICATOR;
  }
  return edge;
}

void
grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg)
{
  grn_com_queue_enque(ctx, &edge->recv_new, (grn_com_queue_entry *)msg);
  grn_dispatcher(ctx, edge);
}
